跳到主要内容

Redis 缓存一致性相关

缓存一致性概述

缓存一致性是指缓存中的数据与数据库中的数据保持同步的程度。在分布式系统中,缓存一致性是一个关键挑战,需要在性能、一致性和可用性之间做出权衡。

  • 写透缓存: 数据安全第一,适合金融等关键业务
  • 绕写缓存: 避免缓存污染,适合写多读少场景
  • 写回缓存: 性能至上,适合可容忍数据丢失的场景
  • 异步更新: 用户体验优先,适合大部分互联网应用

如下图所示

缓存一致性策略执行时序对比

🛡️ 写透缓存 (Write Through)

核心思路: 同时写缓存和数据库,保证强一致性

func WriteThrough(key string, value interface{}) error {
// 1. 先写数据库
if err := database.Set(key, value); err != nil {
return err
}

// 2. 再写缓存
if err := cache.Set(key, value); err != nil {
// 写缓存失败,回滚数据库
database.Delete(key)
return err
}

return nil
}

func Read(key string) (interface{}, error) {
// 优先从缓存读取
if value, err := cache.Get(key); err == nil {
return value, nil
}

// 缓存未命中,从数据库读取
return database.Get(key)
}

// 使用场景:转账操作
func TransferMoney(from, to string, amount float64) error {
fromBalance := getBalance(from) - amount
toBalance := getBalance(to) + amount

// 强一致性要求,使用写透模式
WriteThrough(fmt.Sprintf("balance:%s", from), fromBalance)
WriteThrough(fmt.Sprintf("balance:%s", to), toBalance)
return nil
}

🔄 绕写缓存 (Write Around)

核心思路: 写操作绕过缓存,直接写数据库

它和上面的写透缓存的区别在于,写操作不更新缓存,而是删除缓存中的旧数据。这样可以避免缓存污染,适合写多读少的场景。

执行顺序策略详解

注意,在绕写缓存模式下,还需要确定数据库操作和缓存删除的执行顺序:

方案分析:

  1. 先更新数据库,后删除缓存(推荐)

    • ✅ 优点:即使删除缓存失败,下次读取时会重新加载正确数据
    • ❌ 缺点:可能存在短暂的数据不一致
  2. 先删除缓存,后更新数据库

    • ✅ 优点:避免了脏数据
    • ❌ 缺点:如果更新数据库失败,会导致缓存空白期
  3. 双删策略:删除缓存 → 更新数据库 → 延时删除缓存(双删是为了解决上面 “先删除缓存,后更新数据库” 的问题)

func WriteAround(key string, value interface{}) error {
// 1. 直接写数据库
if err := database.Set(key, value); err != nil {
return err
}

// 2. 删除缓存中的旧数据
cache.Delete(key)
return nil
}

// 双删策略实现
func WriteAroundWithDoubleDelete(key string, value interface{}) error {
// 1. 预删除缓存
cache.Delete(key)

// 2. 更新数据库
if err := database.Set(key, value); err != nil {
return err
}

// 3. 延时删除缓存(处理并发读写)
go func() {
time.Sleep(500 * time.Millisecond)
cache.Delete(key)
}()

return nil
}

func Read(key string) (interface{}, error) {
// 尝试从缓存读取
if value, err := cache.Get(key); err == nil {
return value, nil
}

// 缓存未命中,从数据库读取并缓存
value, err := database.Get(key)
if err == nil {
cache.Set(key, value) // 懒加载
}
return value, err
}

// 使用场景:库存更新
func UpdateInventory(productID string, quantity int) error {
// 库存变化频繁但查询相对较少,避免缓存污染
return WriteAround(fmt.Sprintf("inventory:%s", productID), quantity)
}

双删策略是为什么?

双删策略是为了解决并发读写导致的数据不一致问题,假设我们采用"先删除缓存,后更新数据库"的策略,在高并发情况下可能出现这种时序:

提示

注意如果 "先更新数据库,后删除缓存"策略天然避免了双删要解决的问题,它不需要再用双删这种方案,具体什么时候使用先删缓存,看下一节的说明

下面是时序图

双删策略解决方案

什么时候需要"先删缓存"?

从上面看"先删缓存,后更新DB"好像没有必要,但它其实是为了解决一个特定问题:避免缓存中出现脏数据。

考虑这种极端并发情况("先更新DB,后删缓存"):

所以 "先删缓存"的设计思路

下面是实际应用场景

1、金融场景 - 余额查询

// 转账操作:绝对不能基于过期余额做判断
func Transfer(fromUser, toUser string, amount float64) error {
fromKey := fmt.Sprintf("balance:%s", fromUser)

// 先删除缓存,确保后续读取都是最新余额
cache.Delete(fromKey)

// 读取最新余额(必须从数据库)
balance := getBalanceFromDB(fromUser)
if balance < amount {
return errors.New("余额不足")
}

// 执行转账
return executeTransfer(fromUser, toUser, amount)
}

2、库存扣减场景

// 商品下单:不能基于过期库存判断
func CreateOrder(productID string, quantity int) error {
stockKey := fmt.Sprintf("stock:%s", productID)

// 删除缓存,强制读取实时库存
cache.Delete(stockKey)

stock := getStockFromDB(productID)
if stock < quantity {
return errors.New("库存不足")
}

// 扣减库存
return updateStock(productID, stock-quantity)
}

但大多数情况下确实"多此一举"

  1. 性能代价高:每次写操作都强制后续读走数据库
  2. 复杂度增加:需要双删策略处理并发问题
  3. 收益有限:多数业务能容忍短暂的数据延迟

现实中的最佳实践

// 99%的场景:简单可靠的方案
func UpdateUser(userID int64, userData User) error {
// 1. 更新数据库(数据源)
if err := database.Update(userID, userData); err != nil {
return err
}

// 2. 删除缓存(失败也无所谓)
cache.Delete(fmt.Sprintf("user:%d", userID))

return nil
}

// 1%的场景:强一致性要求
func UpdateCriticalData(key string, data interface{}) error {
// 使用分布式锁 + 写透模式
mutex := acquireDistributedLock(key)
defer mutex.Unlock()

// 同时更新DB和缓存
return writeThrough(key, data)
}

🚀 写回缓存 (Write Back)

核心思路: 只写缓存,定期批量写回数据库

type WriteBackCache struct {
dirtyKeys map[string]bool
ticker *time.Ticker
}

func (w *WriteBackCache) Set(key string, value interface{}) error {
// 1. 写入缓存
if err := cache.Set(key, value); err != nil {
return err
}

// 2. 标记为脏数据
w.dirtyKeys[key] = true
return nil
}

// 定期刷新到数据库
func (w *WriteBackCache) FlushToDB() {
for key := range w.dirtyKeys {
if value, err := cache.Get(key); err == nil {
database.Set(key, value)
}
delete(w.dirtyKeys, key)
}
}

// 使用场景:用户行为记录
func RecordUserAction(userID string, action UserAction) error {
// 高频写入,使用写回模式提升性能
key := fmt.Sprintf("user_action:%s", userID)
return writeBackCache.Set(key, action)
}

⏰ 异步更新 (Async Update)

核心思路: 立即写数据库,异步更新缓存

func AsyncUpdate(key string, value interface{}) error {
// 1. 立即写数据库
if err := database.Set(key, value); err != nil {
return err
}

// 2. 异步更新缓存
go func() {
cache.Set(key, value)
}()

return nil
}

func Read(key string) (interface{}, error) {
// 先读缓存
if value, err := cache.Get(key); err == nil {
return value, nil
}

// 缓存未命中,读数据库
value, err := database.Get(key)
if err == nil {
// 异步更新缓存
go cache.Set(key, value)
}
return value, err
}

// 使用场景:推荐系统
func UpdateRecommendations(userID string, items []string) error {
// 允许短暂不一致,优先保证响应速度
key := fmt.Sprintf("recommendations:%s", userID)
return AsyncUpdate(key, items)
}

分布式锁与一致性

在分布式环境下,多个实例同时更新同一数据可能导致缓存不一致,需要使用分布式锁来保证操作的原子性:

type DistributedCache struct {
redis *redis.Client
db *sql.DB
locker *redsync.Mutex
}

func (dc *DistributedCache) UpdateUserWithLock(userID int64, updateData map[string]interface{}) error {
lockKey := fmt.Sprintf("lock:user:%d", userID)

// 获取分布式锁
mutex := dc.locker.NewMutex(lockKey, redsync.WithExpiry(10*time.Second))
if err := mutex.Lock(); err != nil {
return fmt.Errorf("failed to acquire lock: %v", err)
}
defer mutex.Unlock()

// 在锁保护下进行数据更新
return dc.atomicUpdate(userID, updateData)
}

func (dc *DistributedCache) atomicUpdate(userID int64, updateData map[string]interface{}) error {
userKey := fmt.Sprintf("user:%d", userID)

// 1. 从数据库读取最新数据
user, err := dc.getUserFromDB(userID)
if err != nil {
return err
}

// 2. 应用更新
for field, value := range updateData {
dc.applyUpdate(user, field, value)
}

// 3. 原子性更新数据库和缓存
tx, err := dc.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()

if err := dc.updateUserInTx(tx, user); err != nil {
return err
}

if err := dc.setUserCache(userKey, user); err != nil {
return err
}

return tx.Commit()
}

缓存穿透、击穿、雪崩

缓存穿透解决方案

// 使用布隆过滤器防止缓存穿透
type BloomFilterCache struct {
redis *redis.Client
bloomFilter *bloom.BloomFilter
}

func (bfc *BloomFilterCache) GetUser(userID int64) (*User, error) {
// 1. 布隆过滤器检查
if !bfc.bloomFilter.Test([]byte(fmt.Sprintf("user:%d", userID))) {
return nil, ErrUserNotExists
}

// 2. 查询缓存
key := fmt.Sprintf("user:%d", userID)
if cached := bfc.redis.Get(ctx, key).Val(); cached != "" {
var user User
json.Unmarshal([]byte(cached), &user)
return &user, nil
}

// 3. 查询数据库
user, err := bfc.queryUserFromDB(userID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
// 缓存空值,防止穿透
bfc.redis.Set(ctx, key, "null", 5*time.Minute)
}
return nil, err
}

// 4. 更新缓存和布隆过滤器
bfc.setUserCache(key, user)
return user, nil
}

缓存击穿解决方案

// 使用互斥锁防止缓存击穿
type MutexCache struct {
redis *redis.Client
db *sql.DB
locks sync.Map // 本地锁映射
}

func (mc *MutexCache) GetHotData(key string) (interface{}, error) {
// 1. 尝试从缓存获取
if data := mc.redis.Get(ctx, key).Val(); data != "" {
return data, nil
}

// 2. 获取互斥锁
lockKey := "mutex:" + key
lock, _ := mc.locks.LoadOrStore(lockKey, &sync.Mutex{})
mutex := lock.(*sync.Mutex)

mutex.Lock()
defer mutex.Unlock()

// 3. 双重检查
if data := mc.redis.Get(ctx, key).Val(); data != "" {
return data, nil
}

// 4. 查询数据库并更新缓存
data, err := mc.queryFromDB(key)
if err != nil {
return nil, err
}

// 5. 设置缓存,添加随机过期时间防止雪崩
expiry := time.Duration(300+rand.Intn(60)) * time.Second
mc.redis.Set(ctx, key, data, expiry)

return data, nil
}

缓存雪崩解决方案

// 多级缓存 + 随机过期时间
type MultiLevelCache struct {
l1Cache *sync.Map // 本地缓存
l2Cache *redis.Client // Redis缓存
db *sql.DB
circuit *hystrix.Circuit // 熔断器
}

func (mlc *MultiLevelCache) Get(key string) (interface{}, error) {
// L1: 本地缓存
if value, exists := mlc.l1Cache.Load(key); exists {
return value, nil
}

// L2: Redis缓存
if value := mlc.l2Cache.Get(ctx, key).Val(); value != "" {
// 异步更新L1缓存
go mlc.l1Cache.Store(key, value)
return value, nil
}

// 熔断器保护数据库
return mlc.circuit.Execute(func() (interface{}, error) {
return mlc.queryFromDBWithRandomExpiry(key)
})
}

func (mlc *MultiLevelCache) queryFromDBWithRandomExpiry(key string) (interface{}, error) {
data, err := mlc.queryFromDB(key)
if err != nil {
return nil, err
}

// 随机过期时间 (基础时间 + 随机时间)
baseExpiry := 30 * time.Minute
randomExpiry := time.Duration(rand.Intn(600)) * time.Second
totalExpiry := baseExpiry + randomExpiry

// 更新多级缓存
mlc.l2Cache.Set(ctx, key, data, totalExpiry)
mlc.l1Cache.Store(key, data)

return data, nil
}

最终一致性实现

// 基于消息队列的最终一致性
type EventualConsistencyCache struct {
redis *redis.Client
db *sql.DB
producer kafka.Producer
}

func (ecc *EventualConsistencyCache) UpdateUserAsync(user *User) error {
key := fmt.Sprintf("user:%d", user.ID)

// 1. 立即更新缓存
if err := ecc.setUserCache(key, user); err != nil {
return err
}

// 2. 发送异步更新消息
event := UpdateEvent{
Type: "user_update",
UserID: user.ID,
Data: user,
Timestamp: time.Now(),
Version: user.Version,
}

return ecc.producer.Send("user-updates", event)
}

// 消息消费者处理数据库更新
func (ecc *EventualConsistencyCache) ProcessUpdateEvent(event UpdateEvent) error {
// 1. 获取分布式锁
lockKey := fmt.Sprintf("lock:user:%d", event.UserID)
mutex := ecc.redis.SetNX(ctx, lockKey, "1", 10*time.Second)
if !mutex.Val() {
return fmt.Errorf("failed to acquire lock")
}
defer ecc.redis.Del(ctx, lockKey)

// 2. 版本检查(乐观锁)
currentUser, err := ecc.getUserFromDB(event.UserID)
if err != nil {
return err
}

if currentUser.Version >= event.Version {
// 版本过期,忽略此更新
return nil
}

// 3. 更新数据库
if err := ecc.updateUserInDB(event.Data); err != nil {
// 更新失败,重置缓存到数据库状态
key := fmt.Sprintf("user:%d", event.UserID)
ecc.setUserCache(key, currentUser)
return err
}

return nil
}

缓存一致性监控

// 缓存一致性监控
type ConsistencyMonitor struct {
redis *redis.Client
db *sql.DB
metrics *prometheus.Registry

// 监控指标
inconsistencyCounter prometheus.Counter
latencyHistogram prometheus.Histogram
}

func (cm *ConsistencyMonitor) CheckConsistency(key string) (*ConsistencyReport, error) {
report := &ConsistencyReport{
Key: key,
CheckTime: time.Now(),
}

// 并行获取缓存和数据库数据
var cacheData, dbData interface{}
var cacheErr, dbErr error

done := make(chan bool, 2)

go func() {
cacheData, cacheErr = cm.getFromCache(key)
done <- true
}()

go func() {
dbData, dbErr = cm.getFromDB(key)
done <- true
}()

// 等待两个操作完成
<-done
<-done

// 比较数据一致性
report.Consistent = cm.compareData(cacheData, dbData)
report.CacheError = cacheErr
report.DBError = dbErr

if !report.Consistent {
cm.inconsistencyCounter.Inc()
// 自动修复(可选)
if cm.shouldAutoFix(key) {
cm.fixInconsistency(key, dbData)
}
}

return report, nil
}

缓存一致性最佳实践

  1. 合理设置过期时间:避免过长的缓存时间导致数据陈旧
  2. 使用版本号:实现乐观锁,处理并发更新
  3. 监控和告警:建立完善的监控体系
  4. 渐进式更新:大批量数据更新时采用分批处理
  5. 降级策略:缓存故障时的兜底方案
# 缓存一致性分析工具
redis-cli --eval consistency-check.lua keys , args

# 监控缓存命中率
redis-cli info stats | grep hit_rate

# 分析缓存热点
redis-cli --hotkeys